<?php
namespace Swork\Process;

use PhpAmqpLib\Message\AMQPMessage;
use Swork\Bean\Holder\InstanceHolder;
use Swork\Client\Amqp;
use Swork\Queue\AmqpArgument;
use Swork\Service;

/**
 * Amqp消费者进程
 * Class QueueTaskProcess
 * @package Swork\Process
 */
class QueueTaskProcess implements ProcessHandlerInterface
{
    /**
     * 运行任务
     * @param \swoole_websocket_server $server
     * @param \swoole_process $process
     * @param array $exts 传入的任务Holder列表数据 {cls => [], cls => []}
     * @throws
     */
    public function run(\swoole_websocket_server $server, \swoole_process $process, array $exts = [])
    {
        //重试机制（不让死掉）
        try
        {
            $this->consume($server, $process, $exts);
        }
        catch (\Throwable $ex)
        {
            Service::$logger->error("QueueTaskProcess [{$ex->getMessage()} - worker_id:{$server->worker_id},process_id:{$process->id}] - [{$ex->getCode()}]");
        }
        finally
        {
            usleep(500000);
            $this->run($server, $process, $exts);
        }
    }

    /**
     * 消费队列
     * @param \swoole_websocket_server $server
     * @param \swoole_process $process
     * @param array $exts 传入的任务Holder列表数据 {cls => [], cls => []}
     * @throws \ErrorException
     */
    private function consume(\swoole_websocket_server $server, \swoole_process $process, array $exts)
    {
        //获取amqp对象
        $amqp = InstanceHolder::getClass(Amqp::class);
        if (!$amqp || !($amqp instanceof Amqp))
        {
            UserProcess::closeToExit();
            return;
        }

        //获取连接
        $pool = $amqp->getPool();
        $conn = $pool->getConnection();
        $connect = $amqp->getConnection($conn);
        if ($connect == false)
        {
            UserProcess::closeToExit();
            return;
        }

        //获取管道
        $channel = $connect->channel();

        //获取QueueTaskProcess
        foreach ($exts as $cls => $items)
        {
            //注册消费者
            foreach ($items as $name => $args)
            {
                //队列名
                $queue = $args['queue'];

                //声明队列
                $channel->queue_declare($queue, false, true, false, false, false);

                //获取对象
                $inc = InstanceHolder::getClass($cls);
                if ($inc == false)
                {
                    continue;
                }

                //获取消费者配置
                $consumer_tag = $args['consumer_tag'] ?? '';
                $no_local = ($args['no_local'] ?? '') == 'true';
                $no_ack = ($args['no_ack'] ?? '') == 'true';
                $exclusive = ($args['exclusive'] ?? '') == 'true';
                $nowait = ($args['nowait'] ?? '') == 'true';

                //设置回调
                $callback = function (AMQPMessage $message) use ($inc, $name, $server, $channel, $no_ack) {

                    //传入参数
                    $argument = new AmqpArgument($server, $channel, $message);

                    //执行队列任务
                    $result = null;
                    try
                    {
                        $result = $inc->$name($argument);
                    }
                    catch (\Throwable $ex)
                    {
                        //回放消息（需要act的时候）
                        $result = false;

                        //记录日志
                        $merge = [
                            'ECODE' => $ex->getCode(),
                            'FILE' => $ex->getFile(),
                            'LINE' => $ex->getLine()
                        ];
                        Service::$logger->error($ex->getMessage(), $merge);
                    }
                    finally
                    {
                        //当需要被确认消费时
                        if ($no_ack != true)
                        {
                            //回放消息
                            if ($result === false)
                            {
                                $channel->basic_reject($message->getDeliveryTag(), true);
                            }

                            //确定消费消息
                            if ($result === true)
                            {
                                $channel->basic_ack($message->getDeliveryTag());
                            }
                        }
                    }
                };
                $channel->basic_qos(null, 1, null);

                //注册回调
                $channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback);
            }
        }

        //阻塞进程
        while ($channel->is_consuming())
        {
            $channel->wait();
        }

        //关闭管道、释放连接
        $channel->close();
        $amqp->releaseConnection($conn);
    }
}
